package jupitermouse.site.project.dataclean

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

object TopNStatJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
      .master("local[2]").getOrCreate()


    val accessDF = spark.read.json("file:///E:\\workroom\\learn\\spark\\access.json")
    accessDF.show()
    val day = "20170511"

    //最受欢迎的TopN课程
//    videoTrafficsTopNStat(spark, accessDF, day)
    //按照地市进行统计TopN课程
    cityAccessTopNStat(spark, accessDF, day)
    //最受欢迎的TopN课程
    videoAccessTopNStat(spark, accessDF, day)

    spark.stop()
  }
  def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
    import spark.implicits._
    val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
      .groupBy("day", "cmsId").agg(sum("traffic").as("traffics"))
      .orderBy($"traffics".desc)
    println()

    //sql方式
    accessDF.createOrReplaceTempView("log")
    spark.sql("select * from  (select cmsId,sum(traffic) traffics from log where cmsType = \"video\" and day = "+ day +" group by cmsId) s order by traffics desc,cmsId").show()
  }

  def cityAccessTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
    import spark.implicits._
    val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
      .groupBy("day","city","cmsId")
      .agg(count("cmsId").as("times"))

    accessDF.createOrReplaceTempView("log")

    spark.sql(" select city,traffics,cmsId from ( select  city,cmsId,sum(traffic) traffics from log where cmsType = \"video\" and day = \"20170511\" group by city,cmsId) s order by city,traffics desc").show()

  }

  //hive -> mysql
  def videoAccessTopNStat(spark: SparkSession, accessDF:DataFrame, day:String): Unit = {
    import spark.implicits._

    val videoAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
      .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
    accessDF.createOrReplaceTempView("log")
    spark.sql("spark.sql(\"select  day, cmsId,count(1) as times from log group by day, cmsId order by times desc\").show()");
  }

}
